package com.dbflow5.reactivestreams.query;

import com.dbflow5.config.DBFlowDatabase;
import com.dbflow5.config.FlowLog;
import com.dbflow5.database.DatabaseWrapper;
import com.dbflow5.query.ModelQueriable;
import com.dbflow5.query.list.FlowCursorIterator;
import com.dbflow5.query.list.FlowCursorList;
import com.dbflow5.reactivestreams.transaction.TransactionObservable;
import com.dbflow5.transaction.Transaction;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
 * Description: Wraps a [ModelQueriable] into a [Flowable] that emits each item from the
 * result of the [ModelQueriable] one at a time.
 */
public class CursorListFlowable<T> extends Flowable<T> {

    private final ModelQueriable<T> modelQueriable;
    private final DBFlowDatabase database;

    public CursorListFlowable(ModelQueriable<T> modelQueriable, DBFlowDatabase database) {
        super();
        this.modelQueriable = modelQueriable;
        this.database = database;
    }

    @Override
    protected void subscribeActual(Subscriber<? super T> subscriber) {
        subscriber.onSubscribe(new Subscription() {
            private Transaction<FlowCursorList<T>> transaction = null;
            @Override
            public void request(long n) {
                Transaction.Builder<FlowCursorList<T>> builder = database.beginTransactionAsync((Function<DatabaseWrapper, FlowCursorList<T>>) databaseWrapper -> modelQueriable.cursorList(databaseWrapper));
                TransactionObservable.SingleTransaction<FlowCursorList<T>> single = TransactionObservable.asSingle(builder);
                transaction = single.transaction;
                single.subscribe(new CursorResultObserver(subscriber, n));
            }

            @Override
            public void cancel() {
                if(transaction != null) {
                    transaction.cancel();
                }
            }
        });
    }

    public static class CursorResultObserver<T> implements SingleObserver<FlowCursorList<T>> {
        private final Subscriber<T> subscriber;
        private final long count;

        private final AtomicLong emitted = new AtomicLong();
        private final AtomicLong requested = new AtomicLong();
        private Disposable disposable = null;

        public CursorResultObserver(Subscriber<T> subscriber, long count) {
            this.subscriber = subscriber;
            this.count = count;
        }

        @Override
        public void onSubscribe(@NonNull Disposable disposable) {
            this.disposable = disposable;
        }

        @Override
        public void onSuccess(@NonNull FlowCursorList<T> ts) {
            long starting;
            if(this.count == Long.MAX_VALUE && requested.compareAndSet(0, Long.MAX_VALUE)){
                starting = 0;
            }else {
                starting = emitted.get();
            }

            long limit = this.count + starting;

            while (limit > 0) {
                FlowCursorIterator<T> iterator = ts.iterator(starting, limit);
                try {
                    long i = 0;
                    while (disposable != null && !disposable.isDisposed() && iterator.hasNext() && i++ < limit) {
                        subscriber.onNext(iterator.next());
                    }
                    emitted.addAndGet(i);
                    // no more items
                    if (disposable != null && !disposable.isDisposed() && i < limit) {
                        subscriber.onComplete();
                        break;
                    }
                    limit = requested.addAndGet(-limit);
                } catch (Exception e) {
                    FlowLog.logError(e);
                    subscriber.onError(e);
                } finally {
                    try {
                        iterator.close();
                    } catch (Exception e) {
                        FlowLog.logError(e);
                        subscriber.onError(e);
                    }
                }
            }
        }

        @Override
        public void onError(@NonNull  Throwable e) {
            subscriber.onError(e);
        }
    }
}
